package com.myflink.day05;

import com.myflink.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @author Shelly An
 * @create 2020/9/24 8:48
 */
public class Watermark_Parallalism {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 4444)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                    }
                })
                //并行度为1时：wm>=25 关闭窗口并计算  wm = et(max) - 3 因此当et = 28时，关闭窗口并计算
                //并行度为2时：木桶原理，不同的并行以最小的wm为准，min(wm1,wm2) >= 25时才会触发关闭窗口并计算
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WaterSensor>(Time.seconds(3)) {
                    @Override
                    public long extractTimestamp(WaterSensor element) {
                        return element.getTs() * 1000L;
                    }
                });

        SingleOutputStreamOperator<Long> resultDS = sensorDS
                .keyBy(WaterSensor::getId)
                .timeWindow(Time.seconds(5))
                .process(new ProcessWindowFunction<WaterSensor, Long, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<Long> out) throws Exception {
                        out.collect(elements.spliterator().estimateSize());
                    }
                });
        
        resultDS.print("result");

        env.execute();
    }
}
